Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
57755d3
Removed unnessary Catalyst projection
Apr 26, 2022
01a54f1
Cleaning up naming for Catalyst expression utils
Apr 26, 2022
c77a31c
Employ Catalyst's `UnsafeProjection` when projecting records w/o updates
Apr 26, 2022
502e93e
Consolidated Avro records projections w/in `SafeAvroProjection`
Apr 26, 2022
443ee26
Do unsafe Avro projection after merging to avoid `AvroSafeProjection`…
Apr 26, 2022
ca3024a
Fixed handling of partition schema extraction for both COW/MOR relati…
May 27, 2022
b0e9209
Fixed `BaseFileOnlyRelation`
May 27, 2022
9b5f3cd
Fixed `MergeOnReadSnapshotRelation`
May 27, 2022
6ffb5aa
Abstracted base-file readers creation for MOR relation behind a singl…
May 28, 2022
2a63374
Rebased `MergeOnReadIncrementalRelation` onto new `createBaseFileRead…
May 28, 2022
be05a41
Fixing compilation
May 28, 2022
eb5f747
Fixing compilation
Jul 16, 2022
f9beb7e
Typo
Jul 19, 2022
b29986e
Fixing compilation
Jul 20, 2022
807927b
Converted `BaseFileReader` hierarchy to be case-classes (for serializ…
Jul 20, 2022
b450eec
Fixed base-file reader creation seq to properly project into the requ…
Jul 22, 2022
5b6281e
Delineate clearly data-file schema
Jul 22, 2022
1f3cd58
Fixed `SkipMergeIterator` to properly project into required schema
Jul 22, 2022
d66c47c
Fixed MOR relations to delineate schemas w/in `createBaseFileReaders`
Jul 22, 2022
8f0c173
Revisited projecting into required-schema to only occur w/in the RDD …
Jul 22, 2022
d9be73d
Tidying up
Jul 22, 2022
381d9af
Tidying up
Jul 22, 2022
4865e96
Reverting accidental change
Jul 22, 2022
e121b00
Fixing HFile reader creation to match other file-formats;
Jul 22, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ trait SparkAdapter extends Serializable {
* Creates instance of [[HoodieCatalystExpressionUtils]] providing for common utils operating
* on Catalyst [[Expression]]s
*/
def getCatalystExpressionUtils(): HoodieCatalystExpressionUtils
def getCatalystExpressionUtils: HoodieCatalystExpressionUtils

/**
* Creates instance of [[HoodieCatalystPlansUtils]] providing for common utils operating
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@ package org.apache.hudi

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hudi.HoodieBaseRelation.projectReader
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.hadoop.HoodieROTablePathFilter
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.execution.datasources
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.sources.{BaseRelation, Filter}
import org.apache.spark.sql.types.StructType
Expand Down Expand Up @@ -68,25 +70,34 @@ class BaseFileOnlyRelation(sqlContext: SQLContext,
}

protected override def composeRDD(fileSplits: Seq[HoodieBaseFileSplit],
partitionSchema: StructType,
dataSchema: HoodieTableSchema,
tableSchema: HoodieTableSchema,
requiredSchema: HoodieTableSchema,
requestedColumns: Array[String],
filters: Array[Filter]): HoodieUnsafeRDD = {
filters: Array[Filter]): RDD[InternalRow] = {
val (partitionSchema, dataSchema, requiredDataSchema) =
tryPrunePartitionColumns(tableSchema, requiredSchema)

val baseFileReader = createBaseFileReader(
spark = sparkSession,
partitionSchema = partitionSchema,
dataSchema = dataSchema,
requiredSchema = requiredSchema,
requiredDataSchema = requiredDataSchema,
filters = filters,
options = optParams,
// NOTE: We have to fork the Hadoop Config here as Spark will be modifying it
// to configure Parquet reader appropriately
hadoopConf = embedInternalSchema(new Configuration(conf), requiredSchema.internalSchema)
)

new HoodieFileScanRDD(sparkSession, baseFileReader, fileSplits)
// NOTE: In some case schema of the reader's output (reader's schema) might not match the schema expected by the caller.
// This could occur for ex, when requested schema contains partition columns which might not be persisted w/in the
// data file, but instead would be parsed from the partition path. In that case output of the file-reader will have
// different ordering of the fields than the original required schema (for more details please check out
// [[ParquetFileFormat]] impl). In that case we have to project the rows from the file-reader's schema
// back into the one expected by the caller
val projectedReader = projectReader(baseFileReader, requiredSchema.structTypeSchema)

new HoodieFileScanRDD(sparkSession, projectedReader.apply, fileSplits)
}

protected def collectFileSplits(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[HoodieBaseFileSplit] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ import org.apache.spark.execution.datasources.HoodieInMemoryFileIndex
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Expression, SubqueryExpression}
import org.apache.spark.sql.catalyst.expressions.{Expression, SubqueryExpression, UnsafeProjection}
import org.apache.spark.sql.execution.FileRelation
import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
import org.apache.spark.sql.execution.datasources.parquet.{HoodieParquetFileFormat, ParquetFileFormat}
Expand Down Expand Up @@ -274,7 +274,7 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
def canPruneRelationSchema: Boolean =
(fileFormat.isInstanceOf[ParquetFileFormat] || fileFormat.isInstanceOf[OrcFileFormat]) &&
// NOTE: Some relations might be disabling sophisticated schema pruning techniques (for ex, nested schema pruning)
// TODO(HUDI-XXX) internal schema doesn't supported nested schema pruning currently
// TODO(HUDI-XXX) internal schema doesn't support nested schema pruning currently
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pls raise a PR, and I will repair it later

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean, JIRA, right? Will do

!hasSchemaOnRead

override def schema: StructType = {
Expand Down Expand Up @@ -334,58 +334,32 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
val tableSchema = HoodieTableSchema(tableStructSchema, tableAvroSchemaStr, internalSchemaOpt)
val requiredSchema = HoodieTableSchema(requiredStructSchema, requiredAvroSchema.toString, Some(requiredInternalSchema))

// Since schema requested by the caller might contain partition columns, we might need to
// prune it, removing all partition columns from it in case these columns are not persisted
// in the data files
//
// NOTE: This partition schema is only relevant to file reader to be able to embed
// values of partition columns (hereafter referred to as partition values) encoded into
// the partition path, and omitted from the data file, back into fetched rows;
// Note that, by default, partition columns are not omitted therefore specifying
// partition schema for reader is not required
val (partitionSchema, dataSchema, requiredDataSchema) =
tryPrunePartitionColumns(tableSchema, requiredSchema)

if (fileSplits.isEmpty) {
sparkSession.sparkContext.emptyRDD
} else {
val rdd = composeRDD(fileSplits, partitionSchema, dataSchema, requiredDataSchema, targetColumns, filters)

// NOTE: In case when partition columns have been pruned from the required schema, we have to project
// the rows from the pruned schema back into the one expected by the caller
val projectedRDD = if (requiredDataSchema.structTypeSchema != requiredSchema.structTypeSchema) {
rdd.mapPartitions { it =>
val fullPrunedSchema = StructType(requiredDataSchema.structTypeSchema.fields ++ partitionSchema.fields)
val unsafeProjection = generateUnsafeProjection(fullPrunedSchema, requiredSchema.structTypeSchema)
it.map(unsafeProjection)
}
} else {
rdd
}
val rdd = composeRDD(fileSplits, tableSchema, requiredSchema, targetColumns, filters)

// Here we rely on a type erasure, to workaround inherited API restriction and pass [[RDD[InternalRow]]] back as [[RDD[Row]]]
// Please check [[needConversion]] scala-doc for more details
projectedRDD.asInstanceOf[RDD[Row]]
rdd.asInstanceOf[RDD[Row]]
}
}

/**
* Composes RDD provided file splits to read from, table and partition schemas, data filters to be applied
*
* @param fileSplits file splits to be handled by the RDD
* @param partitionSchema target table's partition schema
* @param dataSchema target table's data files' schema
* @param tableSchema target table's schema
* @param requiredSchema projected schema required by the reader
* @param requestedColumns columns requested by the query
* @param filters data filters to be applied
* @return instance of RDD (implementing [[HoodieUnsafeRDD]])
* @return instance of RDD (holding [[InternalRow]]s)
*/
protected def composeRDD(fileSplits: Seq[FileSplit],
partitionSchema: StructType,
dataSchema: HoodieTableSchema,
tableSchema: HoodieTableSchema,
requiredSchema: HoodieTableSchema,
requestedColumns: Array[String],
filters: Array[Filter]): HoodieUnsafeRDD
filters: Array[Filter]): RDD[InternalRow]

/**
* Provided with partition and date filters collects target file splits to read records from, while
Expand Down Expand Up @@ -553,7 +527,7 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
protected def createBaseFileReader(spark: SparkSession,
partitionSchema: StructType,
dataSchema: HoodieTableSchema,
requiredSchema: HoodieTableSchema,
requiredDataSchema: HoodieTableSchema,
filters: Seq[Filter],
options: Map[String, String],
hadoopConf: Configuration): BaseFileReader = {
Expand All @@ -564,42 +538,56 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
// we have to eagerly initialize all of the readers even though only one specific to the type
// of the file being read will be used. This is required to avoid serialization of the whole
// relation (containing file-index for ex) and passing it to the executor
val reader = tableBaseFileFormat match {
case HoodieFileFormat.PARQUET =>
HoodieDataSourceHelper.buildHoodieParquetReader(
sparkSession = spark,
dataSchema = dataSchema.structTypeSchema,
partitionSchema = partitionSchema,
requiredSchema = requiredSchema.structTypeSchema,
filters = filters,
options = options,
hadoopConf = hadoopConf,
// We're delegating to Spark to append partition values to every row only in cases
// when these corresponding partition-values are not persisted w/in the data file itself
appendPartitionValues = shouldExtractPartitionValuesFromPartitionPath
)
val (read: (PartitionedFile => Iterator[InternalRow]), schema: StructType) =
tableBaseFileFormat match {
case HoodieFileFormat.PARQUET =>
val parquetReader = HoodieDataSourceHelper.buildHoodieParquetReader(
sparkSession = spark,
dataSchema = dataSchema.structTypeSchema,
partitionSchema = partitionSchema,
requiredSchema = requiredDataSchema.structTypeSchema,
filters = filters,
options = options,
hadoopConf = hadoopConf,
// We're delegating to Spark to append partition values to every row only in cases
// when these corresponding partition-values are not persisted w/in the data file itself
appendPartitionValues = shouldExtractPartitionValuesFromPartitionPath
)
// Since partition values by default are omitted, and not persisted w/in data-files by Spark,
// data-file readers (such as [[ParquetFileFormat]]) have to inject partition values while reading
// the data. As such, actual full schema produced by such reader is composed of
// a) Data-file schema (projected or not)
// b) Appended partition column values
val readerSchema = StructType(requiredDataSchema.structTypeSchema.fields ++ partitionSchema.fields)

(parquetReader, readerSchema)

case HoodieFileFormat.HFILE =>
createHFileReader(
val hfileReader = createHFileReader(
spark = spark,
dataSchema = dataSchema,
requiredSchema = requiredSchema,
requiredDataSchema = requiredDataSchema,
filters = filters,
options = options,
hadoopConf = hadoopConf
)

(hfileReader, requiredDataSchema.structTypeSchema)

case _ => throw new UnsupportedOperationException(s"Base file format is not currently supported ($tableBaseFileFormat)")
}

partitionedFile => {
val extension = FSUtils.getFileExtension(partitionedFile.filePath)
if (tableBaseFileFormat.getFileExtension.equals(extension)) {
reader.apply(partitionedFile)
} else {
throw new UnsupportedOperationException(s"Invalid base-file format ($extension), expected ($tableBaseFileFormat)")
}
}
BaseFileReader(
read = partitionedFile => {
val extension = FSUtils.getFileExtension(partitionedFile.filePath)
if (tableBaseFileFormat.getFileExtension.equals(extension)) {
read(partitionedFile)
} else {
throw new UnsupportedOperationException(s"Invalid base-file format ($extension), expected ($tableBaseFileFormat)")
}
},
schema = schema
)
}

protected def embedInternalSchema(conf: Configuration, internalSchemaOpt: Option[InternalSchema]): Configuration = {
Expand All @@ -615,8 +603,17 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
conf
}

private def tryPrunePartitionColumns(tableSchema: HoodieTableSchema,
requiredSchema: HoodieTableSchema): (StructType, HoodieTableSchema, HoodieTableSchema) = {
protected def tryPrunePartitionColumns(tableSchema: HoodieTableSchema,
requiredSchema: HoodieTableSchema): (StructType, HoodieTableSchema, HoodieTableSchema) = {
// Since schema requested by the caller might contain partition columns, we might need to
// prune it, removing all partition columns from it in case these columns are not persisted
// in the data files
//
// NOTE: This partition schema is only relevant to file reader to be able to embed
// values of partition columns (hereafter referred to as partition values) encoded into
// the partition path, and omitted from the data file, back into fetched rows;
// Note that, by default, partition columns are not omitted therefore specifying
// partition schema for reader is not required
if (shouldExtractPartitionValuesFromPartitionPath) {
val partitionSchema = StructType(partitionColumns.map(StructField(_, StringType)))
val prunedDataStructSchema = prunePartitionColumns(tableSchema.structTypeSchema)
Expand Down Expand Up @@ -645,17 +642,45 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,

object HoodieBaseRelation extends SparkAdapterSupport {

type BaseFileReader = PartitionedFile => Iterator[InternalRow]
case class BaseFileReader(read: PartitionedFile => Iterator[InternalRow], val schema: StructType) {
def apply(file: PartitionedFile): Iterator[InternalRow] = read.apply(file)
}

private def generateUnsafeProjection(from: StructType, to: StructType) =
sparkAdapter.getCatalystExpressionUtils().generateUnsafeProjection(from, to)
def generateUnsafeProjection(from: StructType, to: StructType): UnsafeProjection =
sparkAdapter.getCatalystExpressionUtils.generateUnsafeProjection(from, to)

def convertToAvroSchema(structSchema: StructType): Schema =
sparkAdapter.getAvroSchemaConverters.toAvroType(structSchema, nullable = false, "Record")

def getPartitionPath(fileStatus: FileStatus): Path =
fileStatus.getPath.getParent

/**
* Projects provided file reader's output from its original schema, into a [[requiredSchema]]
*
* NOTE: [[requiredSchema]] has to be a proper subset of the file reader's schema
*
* @param reader file reader to be projected
* @param requiredSchema target schema for the output of the provided file reader
*/
def projectReader(reader: BaseFileReader, requiredSchema: StructType): BaseFileReader = {
checkState(reader.schema.fields.toSet.intersect(requiredSchema.fields.toSet).size == requiredSchema.size)

if (reader.schema == requiredSchema) {
reader
} else {
val read = reader.apply(_)
val projectedRead: PartitionedFile => Iterator[InternalRow] = (file: PartitionedFile) => {
// NOTE: Projection is not a serializable object, hence it creation should only happen w/in
// the executor process
val unsafeProjection = generateUnsafeProjection(reader.schema, requiredSchema)
read(file).map(unsafeProjection)
}

BaseFileReader(projectedRead, requiredSchema)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why we still need requriedSchema?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please check my comment where this method is used for an example: whenever we prune partition columns, ordering of the columns would change (partition ones will be removed and then appended to the resulting schema) therefore without projecting back into required schema caller will get dataset that will have incorrect ordering of the columns

}
}

/**
* Projects provided schema by picking only required (projected) top-level columns from it
*
Expand All @@ -666,7 +691,6 @@ object HoodieBaseRelation extends SparkAdapterSupport {
tableSchema match {
case Right(internalSchema) =>
checkState(!internalSchema.isEmptySchema)
// TODO extend pruning to leverage optimizer pruned schema
val prunedInternalSchema = InternalSchemaUtils.pruneInternalSchema(internalSchema, requiredColumns.toList.asJava)
val requiredAvroSchema = AvroInternalSchemaConverter.convert(prunedInternalSchema, "schema")
val requiredStructSchema = AvroConversionUtils.convertAvroSchemaToStructType(requiredAvroSchema)
Expand All @@ -691,10 +715,10 @@ object HoodieBaseRelation extends SparkAdapterSupport {

private def createHFileReader(spark: SparkSession,
dataSchema: HoodieTableSchema,
requiredSchema: HoodieTableSchema,
requiredDataSchema: HoodieTableSchema,
filters: Seq[Filter],
options: Map[String, String],
hadoopConf: Configuration): BaseFileReader = {
hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
val hadoopConfBroadcast =
spark.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))

Expand All @@ -703,10 +727,10 @@ object HoodieBaseRelation extends SparkAdapterSupport {
val reader = new HoodieHFileReader[GenericRecord](hadoopConf, new Path(partitionedFile.filePath),
new CacheConfig(hadoopConf))

val requiredRowSchema = requiredSchema.structTypeSchema
val requiredRowSchema = requiredDataSchema.structTypeSchema
// NOTE: Schema has to be parsed at this point, since Avro's [[Schema]] aren't serializable
// to be passed from driver to executor
val requiredAvroSchema = new Schema.Parser().parse(requiredSchema.avroSchemaStr)
val requiredAvroSchema = new Schema.Parser().parse(requiredDataSchema.avroSchemaStr)
val avroToRowConverter = AvroConversionUtils.createAvroToInternalRowConverter(requiredAvroSchema, requiredRowSchema)

reader.getRecordIterator(requiredAvroSchema).asScala
Expand Down
Loading