Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
7706f12
Removing unnecessary wrappers
Dec 19, 2022
200e13d
Tidyng up
Dec 19, 2022
764204a
Tidying up `HoodieMergeOnReadRDD`
Dec 20, 2022
392e04c
Deduplicate file-slice listing in Spark's relations
Dec 20, 2022
878e689
Rebased MOR iterators onto lazy projection that avoids projecting whe…
Dec 20, 2022
a57e5b4
Converted all Hudi's Spark relations into case classes;
Dec 21, 2022
ae9165d
Abstracted `BaseMergeOnReadSnapshotRelation` to be able to inherit fr…
Dec 21, 2022
e0392f7
Fixed typo
Dec 21, 2022
4d6eb44
Make sure Hudi relation's schema pruned only once
Dec 21, 2022
ede8bf5
Replaced `SafeAvroProjection` w/ simplified `AvroProjection`
Dec 21, 2022
258ba98
XXX
Dec 21, 2022
e8e4868
Fixing compilation for Spark 2.x
Dec 21, 2022
ebe210f
Re-enable `NestedSchemaPruning` rule
Jan 21, 2023
867081c
Abstracted `isProjectionCompatible` utility to control whether `Neste…
Jan 21, 2023
c726d22
Reverting unnecessary changes;
Jan 21, 2023
5a0b542
Fixing NPE
Jan 21, 2023
be35257
Fixing compilation
Jan 21, 2023
50a5462
Added negative tests for `NestedSchemaPruning`
Jan 21, 2023
106b1e0
Fixing test
Jan 21, 2023
fbd9b56
Tidying up
Jan 21, 2023
4a00a79
Tidying up tests
Jan 21, 2023
90c9106
Combined `generateUnsafeProjection` and `generateLazyProjection` to i…
Jan 21, 2023
8f78916
Combined `create` and `createLazy` for `AvroProjection`s as well
Jan 21, 2023
7e89e71
Fixed tests
Jan 23, 2023
a950ede
Fixed nullability annotations
Jan 24, 2023
cd5bab7
Restoring `CONFIG_INSTANTIATION_LOCK`
Jan 24, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@
package org.apache.spark.sql

import org.apache.hudi.SparkAdapterSupport.sparkAdapter
import org.apache.hudi.common.util.ValidationUtils.checkState
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedFunction}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, AttributeSet, Expression, Like, Literal, SubqueryExpression, UnsafeProjection}
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, AttributeSet, CreateStruct, Expression, GetStructField, Like, Literal, Projection, SubqueryExpression, UnsafeProjection, UnsafeRow}
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{DataType, StructType}

Expand Down Expand Up @@ -92,11 +95,16 @@ object HoodieCatalystExpressionUtils {
* B is a subset of A
*/
def generateUnsafeProjection(from: StructType, to: StructType): UnsafeProjection = {
val attrs = from.toAttributes
val attrsMap = attrs.map(attr => (attr.name, attr)).toMap
val targetExprs = to.fields.map(f => attrsMap(f.name))

UnsafeProjection.create(targetExprs, attrs)
val projection = generateUnsafeProjectionInternal(from, to)
val identical = from == to
// NOTE: Have to use explicit [[Projection]] instantiation to stay compatible w/ Scala 2.11
new UnsafeProjection {
override def apply(row: InternalRow): UnsafeRow =
row match {
case ur: UnsafeRow if identical => ur
case _ => projection(row)
}
}
}

/**
Expand Down Expand Up @@ -248,6 +256,14 @@ object HoodieCatalystExpressionUtils {
)
}

private def generateUnsafeProjectionInternal(from: StructType, to: StructType): UnsafeProjection = {
val attrs = from.toAttributes
val attrsMap = attrs.map(attr => (attr.name, attr)).toMap
val targetExprs = to.fields.map(f => attrsMap(f.name))

UnsafeProjection.create(targetExprs, attrs)
}

private def hasUnresolvedRefs(resolvedExpr: Expression): Boolean =
resolvedExpr.collectFirst {
case _: UnresolvedAttribute | _: UnresolvedFunction => true
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.hudi

import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema

abstract class AvroProjection extends (GenericRecord => GenericRecord)

object AvroProjection {

/**
* Creates projection into provided [[Schema]] allowing to convert [[GenericRecord]] into
* new schema
*/
def create(schema: Schema): AvroProjection = {
val projection = (record: GenericRecord) => rewriteRecordWithNewSchema(record, schema)
// NOTE: Have to use explicit [[Projection]] instantiation to stay compatible w/ Scala 2.11
new AvroProjection {
override def apply(record: GenericRecord): GenericRecord =
if (record.getSchema == schema) {
record
} else {
projection(record)
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,26 +36,29 @@ import org.apache.spark.sql.types.StructType
* [[BaseRelation]] implementation only reading Base files of Hudi tables, essentially supporting following querying
* modes:
* <ul>
* <li>For COW tables: Snapshot</li>
* <li>For MOR tables: Read-optimized</li>
* <li>For COW tables: Snapshot</li>
* <li>For MOR tables: Read-optimized</li>
* </ul>
*
* NOTE: The reason this Relation is used in liue of Spark's default [[HadoopFsRelation]] is primarily due to the
* NOTE: The reason this Relation is used in-liue of Spark's default [[HadoopFsRelation]] is primarily due to the
* fact that it injects real partition's path as the value of the partition field, which Hudi ultimately persists
* as part of the record payload. In some cases, however, partition path might not necessarily be equal to the
* verbatim value of the partition path field (when custom [[KeyGenerator]] is used) therefore leading to incorrect
* partition field values being written
*/
class BaseFileOnlyRelation(sqlContext: SQLContext,
metaClient: HoodieTableMetaClient,
optParams: Map[String, String],
userSchema: Option[StructType],
globPaths: Seq[Path])
extends HoodieBaseRelation(sqlContext, metaClient, optParams, userSchema) with SparkAdapterSupport {
case class BaseFileOnlyRelation(override val sqlContext: SQLContext,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Primary change here is converting the class to be a case class, which in turn entails that all of the ctor parameters would become field values requiring corresponding annotation

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason this method is converted to a case-class is to avoid any in-place mutations and instead make updatePrunedDataSchema produce new instance instead

override val metaClient: HoodieTableMetaClient,
override val optParams: Map[String, String],
private val userSchema: Option[StructType],
private val globPaths: Seq[Path],
private val prunedDataSchema: Option[StructType] = None)
extends HoodieBaseRelation(sqlContext, metaClient, optParams, userSchema, prunedDataSchema)
with SparkAdapterSupport {

case class HoodieBaseFileSplit(filePartition: FilePartition) extends HoodieFileSplit

override type FileSplit = HoodieBaseFileSplit
override type Relation = BaseFileOnlyRelation

// TODO(HUDI-3204) this is to override behavior (exclusively) for COW tables to always extract
// partition values from partition path
Expand All @@ -67,6 +70,9 @@ class BaseFileOnlyRelation(sqlContext: SQLContext,

override lazy val mandatoryFields: Seq[String] = Seq.empty

override def updatePrunedDataSchema(prunedSchema: StructType): Relation =
this.copy(prunedDataSchema = Some(prunedSchema))

override def imbueConfigs(sqlContext: SQLContext): Unit = {
super.imbueConfigs(sqlContext)
sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.enableVectorizedReader", "true")
Expand Down Expand Up @@ -106,18 +112,16 @@ class BaseFileOnlyRelation(sqlContext: SQLContext,
}

protected def collectFileSplits(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[HoodieBaseFileSplit] = {
val partitions = listLatestBaseFiles(globPaths, partitionFilters, dataFilters)
val fileSplits = partitions.values.toSeq
.flatMap { files =>
files.flatMap { file =>
// TODO fix, currently assuming parquet as underlying format
HoodieDataSourceHelper.splitFiles(
sparkSession = sparkSession,
file = file,
partitionValues = getPartitionColumnsAsInternalRow(file)
)
}
}
val fileSlices = listLatestFileSlices(globPaths, partitionFilters, dataFilters)
val fileSplits = fileSlices.flatMap { fileSlice =>
// TODO fix, currently assuming parquet as underlying format
val fs = fileSlice.getBaseFile.get.getFileStatus
HoodieDataSourceHelper.splitFiles(
sparkSession = sparkSession,
file = fs,
partitionValues = getPartitionColumnsAsInternalRow(fs)
)
}
// NOTE: It's important to order the splits in the reverse order of their
// size so that we can subsequently bucket them in an efficient manner
.sortBy(_.length)(implicitly[Ordering[Long]].reverse)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,10 +244,10 @@ object DefaultSource {
new IncrementalRelation(sqlContext, parameters, userSchema, metaClient)

case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL, false) =>
new MergeOnReadSnapshotRelation(sqlContext, parameters, userSchema, globPaths, metaClient)
new MergeOnReadSnapshotRelation(sqlContext, parameters, metaClient, globPaths, userSchema)

case (MERGE_ON_READ, QUERY_TYPE_INCREMENTAL_OPT_VAL, _) =>
new MergeOnReadIncrementalRelation(sqlContext, parameters, userSchema, metaClient)
new MergeOnReadIncrementalRelation(sqlContext, parameters, metaClient, userSchema)

case (_, _, true) =>
new HoodieBootstrapRelation(sqlContext, userSchema, globPaths, metaClient, parameters)
Expand Down
Loading