Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,13 @@ public class HoodieBootstrapConfig extends HoodieConfig {
.sinceVersion("0.6.0")
.withDocumentation("Selects the mode in which each file/partition in the bootstrapped dataset gets bootstrapped");

public static final ConfigProperty<String> DATA_QUERIES_ONLY = ConfigProperty
.key("hoodie.bootstrap.data.queries.only")
.defaultValue("false")
.markAdvanced()
.sinceVersion("0.14.0")
.withDocumentation("Improves query performance, but queries cannot use hudi metadata fields");

public static final ConfigProperty<String> FULL_BOOTSTRAP_INPUT_PROVIDER_CLASS_NAME = ConfigProperty
.key("hoodie.bootstrap.full.input.provider")
.defaultValue("org.apache.hudi.bootstrap.SparkParquetBootstrapDataProvider")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2851,6 +2851,11 @@ public Builder withAllowMultiWriteOnSameInstant(boolean allow) {
return this;
}

public Builder withHiveStylePartitioningEnabled(boolean enabled) {
writeConfig.setValue(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE, String.valueOf(enabled));
return this;
}

public Builder withExternalSchemaTrasformation(boolean enabled) {
writeConfig.setValue(AVRO_EXTERNAL_SCHEMA_TRANSFORMATION_ENABLE, String.valueOf(enabled));
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.util.ConfigUtils
import org.apache.hudi.common.util.ValidationUtils.checkState
import org.apache.hudi.config.HoodieBootstrapConfig.DATA_QUERIES_ONLY
import org.apache.hudi.config.HoodieWriteConfig.WRITE_CONCURRENCY_MODE
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.util.PathUtils
Expand Down Expand Up @@ -101,7 +102,8 @@ class DefaultSource extends RelationProvider
)
} else {
Map()
}) ++ DataSourceOptionsHelper.parametersWithReadDefaults(optParams)
}) ++ DataSourceOptionsHelper.parametersWithReadDefaults(optParams +
(DATA_QUERIES_ONLY.key() -> sqlContext.getConf(DATA_QUERIES_ONLY.key(), optParams.getOrElse(DATA_QUERIES_ONLY.key(), DATA_QUERIES_ONLY.defaultValue()))))

// Get the table base path
val tablePath = if (globPaths.nonEmpty) {
Expand Down Expand Up @@ -262,7 +264,7 @@ object DefaultSource {
new MergeOnReadIncrementalRelation(sqlContext, parameters, metaClient, userSchema)

case (_, _, true) =>
new HoodieBootstrapRelation(sqlContext, userSchema, globPaths, metaClient, parameters)
resolveHoodieBootstrapRelation(sqlContext, globPaths, userSchema, metaClient, parameters)

case (_, _, _) =>
throw new HoodieException(s"Invalid query type : $queryType for tableType: $tableType," +
Expand All @@ -271,6 +273,24 @@ object DefaultSource {
}
}

private def resolveHoodieBootstrapRelation(sqlContext: SQLContext,
globPaths: Seq[Path],
userSchema: Option[StructType],
metaClient: HoodieTableMetaClient,
parameters: Map[String, String]): BaseRelation = {
val enableFileIndex = HoodieSparkConfUtils.getConfigValue(parameters, sqlContext.sparkSession.sessionState.conf,
ENABLE_HOODIE_FILE_INDEX.key, ENABLE_HOODIE_FILE_INDEX.defaultValue.toString).toBoolean
val isSchemaEvolutionEnabledOnRead = HoodieSparkConfUtils.getConfigValue(parameters,
sqlContext.sparkSession.sessionState.conf, DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key,
DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean
if (!enableFileIndex || isSchemaEvolutionEnabledOnRead
|| globPaths.nonEmpty || !parameters.getOrElse(DATA_QUERIES_ONLY.key, DATA_QUERIES_ONLY.defaultValue).toBoolean) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain why globPaths.nonEmpty is included here. Not following it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, How are we ensuring that for MOR, the behavior is unchanged ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To answer your first question: I got that condition from BaseFileOnlyRelation.toHadoopFsRelation.
For the second question, I need to go through today and update the existing bootstrap tests

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at the existing testing for bootstrap, there are probably a lot of cases that we are not testing currently.
It doesn't seem like we support MOR with bootstrap very well https://issues.apache.org/jira/browse/HUDI-2071 .

HoodieBootstrapRelation(sqlContext, userSchema, globPaths, metaClient, parameters + (DATA_QUERIES_ONLY.key() -> "false"))
} else {
HoodieBootstrapRelation(sqlContext, userSchema, globPaths, metaClient, parameters).toHadoopFsRelation
}
}

private def resolveBaseFileOnlyRelation(sqlContext: SQLContext,
globPaths: Seq[Path],
userSchema: Option[StructType],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, T
import org.apache.hudi.common.util.StringUtils.isNullOrEmpty
import org.apache.hudi.common.util.ValidationUtils.checkState
import org.apache.hudi.common.util.{ConfigUtils, StringUtils}
import org.apache.hudi.config.HoodieBootstrapConfig.DATA_QUERIES_ONLY
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.hadoop.CachingPath
import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter
Expand Down Expand Up @@ -225,7 +226,9 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
val shouldExtractPartitionValueFromPath =
optParams.getOrElse(DataSourceReadOptions.EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH.key,
DataSourceReadOptions.EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH.defaultValue.toString).toBoolean
shouldOmitPartitionColumns || shouldExtractPartitionValueFromPath
val shouldUseBootstrapFastRead = optParams.getOrElse(DATA_QUERIES_ONLY.key(), "false").toBoolean

shouldOmitPartitionColumns || shouldExtractPartitionValueFromPath || shouldUseBootstrapFastRead
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.execution.datasources.PartitionedFile
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, PartitionedFile}
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.isMetaField
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.StructType
Expand Down Expand Up @@ -200,6 +200,16 @@ case class HoodieBootstrapRelation(override val sqlContext: SQLContext,
override def updatePrunedDataSchema(prunedSchema: StructType): HoodieBootstrapRelation =
this.copy(prunedDataSchema = Some(prunedSchema))

def toHadoopFsRelation: HadoopFsRelation = {
HadoopFsRelation(
location = fileIndex,
partitionSchema = fileIndex.partitionSchema,
dataSchema = fileIndex.dataSchema,
bucketSpec = None,
fileFormat = fileFormat,
optParams)(sparkSession)
}

//TODO: This should be unnecessary with spark 3.4 [SPARK-41970]
private def encodePartitionPath(file: FileStatus): String = {
val tablePathWithoutScheme = CachingPath.getPathWithoutSchemeAndAuthority(bootstrapBasePath)
Expand All @@ -212,7 +222,6 @@ case class HoodieBootstrapRelation(override val sqlContext: SQLContext,


object HoodieBootstrapRelation {

private def validate(requiredDataSchema: HoodieTableSchema, requiredDataFileSchema: StructType, requiredSkeletonFileSchema: StructType): Unit = {
val requiredDataColumns: Seq[String] = requiredDataSchema.structTypeSchema.fieldNames.toSeq
val combinedColumns = (requiredSkeletonFileSchema.fieldNames ++ requiredDataFileSchema.fieldNames).toSeq
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.apache.hudi.HoodieFileIndex.{DataSkippingFailureMode, collectReferenc
import org.apache.hudi.HoodieSparkConfUtils.getConfigValue
import org.apache.hudi.common.config.TimestampKeyGeneratorConfig.{TIMESTAMP_INPUT_DATE_FORMAT, TIMESTAMP_OUTPUT_DATE_FORMAT}
import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties}
import org.apache.hudi.common.model.HoodieBaseFile
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.util.StringUtils
import org.apache.hudi.exception.HoodieException
Expand All @@ -41,6 +42,7 @@ import org.apache.spark.unsafe.types.UTF8String
import java.text.SimpleDateFormat
import javax.annotation.concurrent.NotThreadSafe
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.util.control.NonFatal
import scala.util.{Failure, Success, Try}

Expand Down Expand Up @@ -145,11 +147,10 @@ case class HoodieFileIndex(spark: SparkSession,
val prunedPartitions = listMatchingPartitionPaths(partitionFilters)
val listedPartitions = getInputFileSlices(prunedPartitions: _*).asScala.toSeq.map {
case (partition, fileSlices) =>
val baseFileStatuses: Seq[FileStatus] =
fileSlices.asScala
.map(fs => fs.getBaseFile.orElse(null))
.filter(_ != null)
.map(_.getFileStatus)
val baseFileStatuses: Seq[FileStatus] = getBaseFileStatus(fileSlices
.asScala
.map(fs => fs.getBaseFile.orElse(null))
.filter(_ != null))

// Filter in candidate files based on the col-stats index lookup
val candidateFiles = baseFileStatuses.filter(fs =>
Expand Down Expand Up @@ -179,6 +180,23 @@ case class HoodieFileIndex(spark: SparkSession,
}
}

/**
* In the fast bootstrap read code path, it gets the file status for the bootstrap base files instead of
* skeleton files.
*/
private def getBaseFileStatus(baseFiles: mutable.Buffer[HoodieBaseFile]): mutable.Buffer[FileStatus] = {
if (shouldFastBootstrap) {
baseFiles.map(f =>
if (f.getBootstrapBaseFile.isPresent) {
f.getBootstrapBaseFile.get().getFileStatus
} else {
f.getFileStatus
})
} else {
baseFiles.map(_.getFileStatus)
}
}

private def lookupFileNamesMissingFromIndex(allIndexedFileNames: Set[String]) = {
val allBaseFileNames = allFiles.map(f => f.getPath.getName).toSet
allBaseFileNames -- allIndexedFileNames
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,12 @@ import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.HoodieConversionUtils.toJavaOption
import org.apache.hudi.SparkHoodieTableFileIndex._
import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.common.bootstrap.index.BootstrapIndex
import org.apache.hudi.common.config.TypedProperties
import org.apache.hudi.common.model.HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION
import org.apache.hudi.common.model.{FileSlice, HoodieTableQueryType}
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.util.ValidationUtils.checkState
import org.apache.hudi.config.HoodieBootstrapConfig.DATA_QUERIES_ONLY
import org.apache.hudi.hadoop.CachingPath
import org.apache.hudi.hadoop.CachingPath.createRelativePathUnsafe
import org.apache.hudi.keygen.{StringPartitionPathFormatter, TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator}
Expand All @@ -40,7 +41,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.catalyst.{InternalRow, expressions}
import org.apache.spark.sql.execution.datasources.{FileStatusCache, NoopCache}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{ByteType, DataType, DateType, IntegerType, LongType, ShortType, StringType, StructField, StructType}
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String

import javax.annotation.concurrent.NotThreadSafe
Expand Down Expand Up @@ -83,10 +84,18 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
/**
* Get the schema of the table.
*/
lazy val schema: StructType = schemaSpec.getOrElse({
val schemaUtil = new TableSchemaResolver(metaClient)
AvroConversionUtils.convertAvroSchemaToStructType(schemaUtil.getTableAvroSchema)
})
lazy val schema: StructType = if (shouldFastBootstrap) {
StructType(rawSchema.fields.filterNot(f => HOODIE_META_COLUMNS_WITH_OPERATION.contains(f.name)))
} else {
rawSchema
}

private lazy val rawSchema: StructType = schemaSpec.getOrElse({
val schemaUtil = new TableSchemaResolver(metaClient)
AvroConversionUtils.convertAvroSchemaToStructType(schemaUtil.getTableAvroSchema)
})

protected lazy val shouldFastBootstrap = configProperties.getBoolean(DATA_QUERIES_ONLY.key, false)

private lazy val sparkParsePartitionUtil = sparkAdapter.getSparkParsePartitionUtil

Expand All @@ -110,7 +119,7 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
.map(column => nameFieldMap.apply(column))

if (partitionFields.size != partitionColumns.get().size) {
val isBootstrapTable = BootstrapIndex.getBootstrapIndex(metaClient).useIndex()
val isBootstrapTable = tableConfig.getBootstrapBasePath.isPresent
if (isBootstrapTable) {
// For bootstrapped tables its possible the schema does not contain partition field when source table
// is hive style partitioned. In this case we would like to treat the table as non-partitioned
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import java.util.stream.Stream;

import static org.apache.hudi.common.testutils.RawTripTestPayload.recordToString;
import static org.apache.hudi.config.HoodieBootstrapConfig.DATA_QUERIES_ONLY;
import static org.junit.jupiter.api.Assertions.assertEquals;

/**
Expand Down Expand Up @@ -230,12 +231,16 @@ protected void compareTables() {
}
Dataset<Row> hudiDf = sparkSession.read().options(readOpts).format("hudi").load(hudiBasePath);
Dataset<Row> bootstrapDf = sparkSession.read().format("hudi").load(bootstrapTargetPath);
Dataset<Row> fastBootstrapDf = sparkSession.read().format("hudi").option(DATA_QUERIES_ONLY.key(), "true").load(bootstrapTargetPath);
if (nPartitions == 0) {
compareDf(fastBootstrapDf.drop("city_to_state"), bootstrapDf.drop(dropColumns).drop("_hoodie_partition_path"));
compareDf(hudiDf.drop(dropColumns), bootstrapDf.drop(dropColumns));
return;
}
compareDf(hudiDf.drop(dropColumns).drop(partitionCols), bootstrapDf.drop(dropColumns).drop(partitionCols));
compareDf(fastBootstrapDf.drop("city_to_state").drop(partitionCols), bootstrapDf.drop(dropColumns).drop("_hoodie_partition_path").drop(partitionCols));
compareDf(hudiDf.select("_row_key",partitionCols), bootstrapDf.select("_row_key",partitionCols));
compareDf(fastBootstrapDf.select("_row_key",partitionCols), bootstrapDf.select("_row_key",partitionCols));
}

protected void compareDf(Dataset<Row> df1, Dataset<Row> df2) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -583,10 +583,12 @@ class TestDataSourceForBootstrap {
assertEquals(HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS, commitInstantTime1)

// Read bootstrapped table and verify count
val hoodieROViewDF1 = spark.read.format("hudi").load(basePath + "/*")
val hoodieROViewDF1 = spark.read.format("hudi")
.option(HoodieBootstrapConfig.DATA_QUERIES_ONLY.key(), "true").load(basePath + "/*")
assertEquals(sort(sourceDF).collectAsList(), sort(dropMetaCols(hoodieROViewDF1)).collectAsList())

val hoodieROViewDFWithBasePath = spark.read.format("hudi").load(basePath)
val hoodieROViewDFWithBasePath = spark.read.format("hudi")
.option(HoodieBootstrapConfig.DATA_QUERIES_ONLY.key(), "true").load(basePath)
assertEquals(sort(sourceDF).collectAsList(), sort(dropMetaCols(hoodieROViewDFWithBasePath)).collectAsList())

// Perform upsert
Expand All @@ -606,7 +608,8 @@ class TestDataSourceForBootstrap {
assertEquals(1, HoodieDataSourceHelpers.listCommitsSince(fs, basePath, commitInstantTime1).size())

// Read table after upsert and verify count
val hoodieROViewDF2 = spark.read.format("hudi").load(basePath + "/*")
val hoodieROViewDF2 = spark.read.format("hudi")
.option(HoodieBootstrapConfig.DATA_QUERIES_ONLY.key(), "true").load(basePath + "/*")
assertEquals(numRecords, hoodieROViewDF2.count())
assertEquals(numRecordsUpdate, hoodieROViewDF2.filter(s"timestamp == $updateTimestamp").count())

Expand Down